a544a2c9ad0d4551e60bf1d319e75ffe36fee57b,cdap-data-fabric/src/main/java/co/cask/cdap/data/stream/service/DistributedStreamService.java,DistributedStreamService,runOneIteration,#,189

Before Change


    LOG.trace("Performing heartbeat publishing in Stream service instance {}", instanceId);
    ImmutableMap.Builder<String, Long> sizes = ImmutableMap.builder();
    Collection<StreamSpecification> specifications =
      streamMetaStore.listStreams().get(new NamespaceMeta.Builder().setId(Constants.DEFAULT_NAMESPACE).build());
    for (StreamSpecification streamSpec : specifications) {
      sizes.put(streamSpec.getName(), streamWriterSizeCollector.getTotalCollected(streamSpec.getName()));
    }

After Change


  protected void runOneIteration() throws Exception {
    LOG.trace("Performing heartbeat publishing in Stream service instance {}", instanceId);
    ImmutableMap.Builder<String, Long> sizes = ImmutableMap.builder();
    for (StreamSpecification streamSpec : streamMetaStore.listStreams(Constants.DEFAULT_NAMESPACE)) {
      sizes.put(streamSpec.getName(), streamWriterSizeCollector.getTotalCollected(streamSpec.getName()));
    }
    heartbeatPublisher.sendHeartbeat(new StreamWriterHeartbeat(System.currentTimeMillis(), instanceId, sizes.build()));